package com.bd08.flink.demo

import org.apache.flink.streaming.api.functions.source.SourceFunction

class MyRandomSource extends SourceFunction[Int]{
  override def run(sourceContext: SourceFunction.SourceContext[Int]): Unit = {
    while(true){
      val sleepTimes = (Math.random()*5+1).toInt*1000;
      println(s"gen sleeptime is $sleepTimes ..............")
      Thread.sleep(sleepTimes)
      val data = (Math.random()*100+1).toInt;
      println(s"gen data is $data ..............")
      sourceContext.collect(data)
    }
  }

  override def cancel(): Unit = {


  }
}
